\section{Stream Expression}

\href{http://mirror.ocamlcore.org/ocaml-tutorial.org/streams.html}{Link
  to streams}

Stream Expression


\inputminted[fontsize=\scriptsize]{ocaml}{lang/code/stream/test_stream.ml}



Module Stream
\begin{alternate}
Stream.npeek;;
- : int -> 'a Batteries.Stream.t -> 'a list = <fun>
Stream.next;;
- : 'a Stream.t -> 'a = <fun>
\end{alternate}



\begin{ocamlcode}
let lines_stream_of_channel chan = Stream.from (fun _ -> 
    try Some (input_line chan) with End_of_file -> None );;
val lines_stream_of_channel : BatIO.input -> string Batteries.Stream.t =
\end{ocamlcode}


It raises \textit{Stream.Failure} on an empty stream,
i.e. \textit{Stream.next}

\begin{ocamlcode}
let line_stream_of_string string =
  Stream.of_list (Str.(split (regexp "\n") string))
\end{ocamlcode}

Constructing streams \\
  \begin{bluetext}
    Stream.from
    Stream.of_list
    Stream.of_string (* char t *)
    Stream.of_channel (* char t *)
  \end{bluetext}

Consuming streams \\

\begin{bluetext}
   Stream.peek
   Stream.junk
\end{bluetext}

\begin{ocamlcode}
let paragraph lines =
  let rec next para_lines i =
    match Stream.peek lines,para_lines with
    | None, [] -> None
    | Some "", [] ->
      Stream.junk lines (* still a white paragraph *)
      next para_lines i
    | Some "", _ | None, _ ->
      Some (String.concat "\n" (List.rev para_lines)) (* a new paragraph*)
    | Some line, _ ->
      Stream.junk lines ;
      next (line :: para_line ) i in
  Stream.from (next [])    
let stream_fold f stream init = 
    let result = ref init in 
    Stream.iter (fun x -> result := f x !result) stre  am; !result;;
val stream_fold : ('a -> 'b -> 'b) -> 'a Batteries.Stream.t -> 'b -> 'b =
  <fun>
let stream_concat streams = 
  let current_stream = ref None in 
  let rec next i = 
    try 
      let stream = match !current_stream with 
        | Some stream -> stream 
        | None -> 
          let stream = Stream.next streams in 
          current_stream := Some stream ; 
          stream in 
      try Some (Stream.next stream)
      with Stream.Failure -> (current_stream := None ; next i)
    with Stream.Failure -> None in 
  Stream.from next
\end{ocamlcode}

\textit{Copying or sharing} streams \\
  This was called \textit{dup} in Enum
  \begin{ocamlcode}
(** create 2 buffers to store some pre-fetched value *)
let stream_tee stream = 
  let next self other i = 
    try 
      if Queue.is_empty self 
      then 
        let value = Stream.next stream in 
        Queue.add value other ;
        Some value
      else 
        Some (Queue.take self)
    with Stream.Failure -> None in 
  let q1,q2 = Queue.create (), Queue.create () in 
  (Stream.from (next q1 q2), Stream.from (next q2 q1))
\end{ocamlcode}

Convert arbitray data types to streams 


If the datat type defines an \textit{iter} function, and you don't
mind using threads, you can use a \textit{producer-consumer}
arrangement to invert control.

\begin{ocamlcode}
let elements iter coll = 
  let channel = Event.new_channel () in 
  let producer () = 
    let _ = iter (fun x -> Event.(sync (send channel (Some x )))) coll in 
    Event.(sync (send channel None)) in 
  let consumer i = 
    Event.(sync (receive channel)) in 
  ignore (Thread.create producer ()) ; 
  Stream.from consumer    
val elements : (('a -> unit) -> 'b -> 'c) -> 'b -> 'a Batteries.Stream.t =    
\end{ocamlcode}

Keep in mind that these techniques spawn producer threads which carry
a few risks: they only terminate when they have finished iterating,
and any change to the original data structure while iterating may
produce unexpected results.



%%% Local Variables: 
%%% mode: latex
%%% TeX-master: "../master"
%%% End: 
